Spark Streaming examplesΒΆ

Use socket server to publish data from file to port 9000
# -*- coding: utf-8 -*-
import os
import sys

os.chdir("/home/cloudops/spark")
os.curdir

# Configure the environment. Set this up to the directory where
# Spark is installed
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/opt/spark'

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

# Add the following paths to the system path.
# Please check your installation to make sure that these zip files
# actually exist. The names might change as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.9-src.zip"))

#Initiate Spark context. Once this is done all other applications can run
from pyspark import SparkContext
from pyspark import SparkConf

# Optionally configure Spark Settings
conf = SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")

conf.setAppName("Spark-Streaming")
# <pyspark.conf.SparkConf at 0x7fddbb2eb588>

# ValueError: Cannot run multiple SparkContexts at once;
# existing SparkContext(app=Spark-Test, master=local) created by __init__
# conf.setAppName("Spark-Test")

# =====================================
# Initialize SparkContext. Run only once !!!
# Otherwise you get multiple Context Error.
# for streaming, create a spark context with 2 threads
# * one thread is background thread
# * second (frontend) is for streaming

# Run sc.stop() before you create your new SparkContext.
# Also, you can use sc = SparkContext.getOrCreate()
# instead of sc = SparkContext()

# sc.stop()

sc = SparkContext('local[2]', conf=conf)
# OK
# =====================================

from pyspark.streaming import StreamingContext

# =====================================
# 1. Streaming with simple data
# =====================================

vc = [[-0.1, -0.2],
      [0.1, 0.3],
      [1.1, 1.5],
      [0.9, 0.9]]

# RDD (parallelize) - publish on Queue
dvc = [sc.parallelize(i, 1) for i in vc]

# Stream the contents of the RDD (Queue Stream) - deque the stream
# micro-batch interval = 2 sec
ssc = StreamingContext(sc, 2)

# Create input stream (new RDD) from source RDD to ssc
input_stream = ssc.queueStream(dvc)

def get_output(rdd):
    print(rdd.collect())

# transformation (function) on input stream
input_stream.foreachRDD(get_output)
# after stream definition - start it (see console output and jobs in UI)
ssc.start()
# [-0.1, -0.2]
# [0.1, 0.3]
# [1.1, 1.5]
# [0.9, 0.9]
# []
# []
# []
# . . .
# stop the streaming
ssc.stop()

# =====================================
# 2. Streaming with TCP/IP data
# =====================================

# Need to create a socket server on Java or Python
# Server is publishing data from file

# RESTART KERNEL!!!

# Create streaming context with latency of 1
streamContext = StreamingContext(sc, 3)

totalLines = 0
# Listen port 9000 and assign to lines RDD; for every micro-batch
lines = streamContext.socketTextStream("localhost", 9000)

# =====================================
# 1. Word count for every lines RDD
words = lines.flatMap(lambda line: line.split(" "))

# how many times the word has been repeated?
pairs = words.map(lambda word: (word, 1))

# for every key (word in pairs) keep adding a count
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# pprint is special command for streaming; get top 5 items
wordCounts.pprint(5)

# =====================================
# 2. Another job - Count lines
totalLines = 0
linesCount = 0

def computeMetrics(rdd):

    global totalLines
    global linesCount

    linesCount = rdd.count()
    totalLines += linesCount

    print(rdd.collect())
    print("Lines in RDD :", linesCount, " Total Lines:", totalLines)

lines.foreachRDD(computeMetrics)

# =====================================
# 3. Another job - Compute window metrics
def windowMetrics(rdd):
    print("Window RDD size:", rdd.count())

# 6 - interval (= multiple of micro-batch time (= 3 sec, above)
#     So, it's 2 micro-batches in window (current and previous)
# 3 - slide by 3 seconds
windowedRDD = lines.window(6, 3)
windowedRDD.foreachRDD(windowMetrics)

# =====================================
streamContext.start()
# -------------------------------------------
# Time: 2019-03-19 17:11:18
# -------------------------------------------
# ('da', 1)
# ('sucked...', 1)
# ('Client', 1)
# ('on', 1)
# ('127.0.0.1ERM', 1)
# ...
# ['Client on 127.0.0.1ERM da vinci code and it sucked...']
# Lines in RDD : 1  Total Lines: 1
# Window RDD size: 1

# . . .

# -------------------------------------------
# Time: 2019-03-19 17:11:57
# -------------------------------------------
# ('really', 1)
# ('like', 1)
# ('The', 1)
# ('Da', 1)
# ('Vinci', 1)
# ...

# ['I really like The Da Vinci Code.']
# Lines in RDD : 1  Total Lines: 20
# Window RDD size: 3

streamContext.stop()

print("Overall lines :", totalLines)
# Overall lines : 20